﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

using iTool.Cloud.Database.iToolService;
using iTool.Cloud.Database.KeyGeneratorProvider.Contract;
using iTool.Cloud.Database.MaxVersionPlacementStrategyFixedProvider;
using iTool.Cloud.Database.Options;
using iTool.ClusterComponent;
using iTool.SQL.AIHandle;

using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Options;

using Orleans.Concurrency;
using Orleans.Configuration;
using Orleans.Runtime;

namespace iTool.Cloud.Database.ServiceProvider.Implementation
{

    /// <summary>
    /// 以Table 为主题
    /// </summary>
    [Reentrant]
    [MaxVersionPlacementStrategy]
    public class TableExecuteService : iToolServiceDataBase, iTableExecuteService
    {
        readonly Subject<BatchItem<IList<AnalysisExecuteItemOptions>>> _subject;
        readonly ISiloStatusOracle siloStatusOracle;
        int TimerLoopCount = 0; // 100ms = 1

        public TableExecuteService(KeyGeneratorOptions options, IOptions<ClusterOptions> clusterOptions, ISiloStatusOracle siloStatusOracle) : base(options, clusterOptions, siloStatusOracle)
        {
            this.siloStatusOracle = siloStatusOracle;
            _subject = new Subject<BatchItem<IList<AnalysisExecuteItemOptions>>>();
            // 60毫秒 或者 200 条
            _subject.Buffer(TimeSpan.FromMilliseconds(60), 100)
                .Where(x => x.Count > 0)
                .Select(list => Observable.FromAsync(() => BatchExecuteNonQueryAsync(list)))
                .Concat()
                .Subscribe();
        }

        public override async Task OnActivateAsync()
        {
            await base.OnActivateAsync();
            TimerEventHandel += async (obj, sender) =>
            {
                if (TimerLoopCount > 100) // > 10s
                {
                    if (TransactionExecLoggers?.Any() == true)
                    {
                        var tranids = TransactionExecLoggers.GroupBy(item => item.tranId).Select(item => item.Key).ToArray();
                        foreach (var tranid in tranids)
                        {
                            await iTransaction.DisposeAsync(tranid);
                        }
                        TransactionExecLoggers?.Clear();
                        TransactionExecLoggers = null;
                    }
                }
                else if (TransactionExecLoggers?.Any() == true)
                {
                    TimerLoopCount++;
                }
            };
        }

        public Task<string> GetRegisterServiceAsync()
        {
            return Task.FromResult(siloStatusOracle.SiloAddress.ToParsableString());
        }

        #region iTransactionService
        List<ExecLogger>? TransactionExecLoggers;
        public async Task ExecuteTransactionAsync(List<AnalysisExecuteItemOptions> executeItem, long tranId)
        {
            try
            {
                iTransaction.CheckTransaction(tranId);

                TimerLoopCount = 0;
                TransactionExecLoggers = TransactionExecLoggers ?? new List<ExecLogger>();
                foreach (var item in executeItem)
                {
                    await GetFormatSqlAsync(item);
                    long insertKey = 0;
                    if (item.Action == SQLActionOptions.INSERT)
                    {
                        insertKey = NextKeyOfLong();
                        item.Parameters[^1] = new SqliteParameter("@B_IDX", insertKey);
                    }

                    await using (var command = new SqliteCommand(item.QuerySql, iTransaction.GetConnection(tranId)))
                    {
                        command.Transaction = iTransaction.GetSqliteTransaction(tranId);
                        if (item.Parameters.Any())
                            command.Parameters.AddRange(item.Parameters);

                        int rows = await command.ExecuteNonQueryAsync();
                        if (rows > 0)
                        {
                            long[] keys = insertKey > 0 ? new long[] { insertKey } : item.Keys;
                            TransactionExecLoggers.Add(new ExecLogger(item.TableName, item.Action, keys, item.QuerySql, item.SetFields.Any() ? string.Join(',', item.SetFields) : string.Empty, tranId));
                        }
                    }
                }
            }
            catch (Exception)
            {
                TransactionExecLoggers?.Clear();
                TransactionExecLoggers = null;
                throw;
            }
        }
        public async Task ComplatedTransactionAsync()
        {
            try
            {
                if (TransactionExecLoggers != null)
                {
                    await AddExecLoggerAsync(TransactionExecLoggers);
                }
            }
            finally
            {
                if (TransactionExecLoggers != null)
                {
                    TransactionExecLoggers.Clear();
                    TransactionExecLoggers = null;
                }
            }
        }
        #endregion

        public async Task<long> ExecuteNonQueryAsync(AnalysisExecuteItemOptions executeItem)
        {
            Console.WriteLine(executeItem.Sql);
            await GetFormatSqlAsync(executeItem);
            List<ExecLogger> execLoggers = new List<ExecLogger>();
            await using (var connection = await GetConnectionAsync(executeItem.CustomFunctions))
            {
                await connection.OpenAsync();
                long insertKey = 0, rows = 0;
                if (executeItem.Action == SQLActionOptions.INSERT)
                {
                    insertKey = NextKeyOfLong();
                    executeItem.Parameters[^1].Value = insertKey;
                }

                await using (var command = new SqliteCommand(executeItem.QuerySql, connection))
                {
                    if (executeItem.Parameters.Any())
                        command.Parameters.AddRange(executeItem.Parameters);

                    rows = await command.ExecuteNonQueryAsync();
                    if (rows > 0)
                    {
                        long[] keys = insertKey > 0 ? new long[] { insertKey } : executeItem.Keys;
                        execLoggers.Add(new ExecLogger(executeItem.TableName, executeItem.Action, keys, executeItem.QuerySql, executeItem.SetFields.Any() ? string.Join(',', executeItem.SetFields) : string.Empty, 0));
                    }
                }
                await AddExecLoggerAsync(execLoggers);
                return rows;
            }
        }

        public async Task ExecuteNonQueryNoResultAsync(AnalysisExecuteItemOptions executeItem)
        {
            await GetFormatSqlAsync(executeItem);
            var batchItem = new BatchItem<IList<AnalysisExecuteItemOptions>>(new List<AnalysisExecuteItemOptions> { executeItem });
            _subject.OnNext(batchItem);
            await batchItem.TaskSource.Task;
        }

        public async Task BatchExecuteNonQueryAsync(List<AnalysisExecuteItemOptions> executeItems)
        {
            foreach (var item in executeItems)
            {
                await GetFormatSqlAsync(item);
            }

            var batchItem = new BatchItem<IList<AnalysisExecuteItemOptions>>(executeItems);
            _subject.OnNext(batchItem);
            await batchItem.TaskSource.Task;
        }

        async Task BatchExecuteNonQueryAsync(IList<BatchItem<IList<AnalysisExecuteItemOptions>>> messages)
        {
            try
            {
                List<ExecLogger> execLoggers = new List<ExecLogger>();
                await using (var connection = await GetConnectionAsync(new List<string> { "distance" }))
                {
                    await connection.OpenAsync();
                    var transaction = await connection.BeginTransactionAsync(System.Data.IsolationLevel.ReadUncommitted);
                    try
                    {
                        foreach (var batchItem in messages)
                        {
                            foreach (var item in batchItem.Body)
                            {
                                long insertKey = 0;
                                if (item.Action == SQLActionOptions.INSERT)
                                {
                                    insertKey = NextKeyOfLong();
                                    item.Parameters[^1].Value = insertKey;
                                }

                                await using (var command = new SqliteCommand(item.QuerySql, connection))
                                {
                                    command.Transaction = (SqliteTransaction)transaction;

                                    if (item.Parameters.Any())
                                        command.Parameters.AddRange(item.Parameters);

                                    int rows = await command.ExecuteNonQueryAsync();
                                    if (rows > 0)
                                    {
                                        long[] keys = insertKey > 0 ? new long[] { insertKey } : item.Keys;
                                        execLoggers.Add(new ExecLogger(item.TableName, item.Action, keys, item.QuerySql, item.SetFields.Any() ? string.Join(',', item.SetFields) : string.Empty, 0));
                                    }
                                }
                            }
                        }

                        await transaction.CommitAsync();
                        await AddExecLoggerAsync(execLoggers);

                        foreach (var item in messages)
                            item.TaskSource.TrySetResult(Task.CompletedTask);
                    }
                    catch (Exception ex)
                    {
                        await transaction.RollbackAsync();
                        foreach (var item in messages)
                            item.TaskSource.TrySetException(ex);
                    }
                    finally
                    {
                        await transaction.DisposeAsync();
                    }
                }
            }
            catch (Exception ex)
            {
                foreach (var item in messages)
                    item.TaskSource.TrySetException(ex);
            }
        }
        
        // TUDO 同步索引输入数据量大可能会导致 长时间的写操作停顿，这是不能接受的。 后续需要改进
        public async Task SyncIndexFieldsAsync(List<string>? locations, List<string>? fields)
        {
            await this.IsNeedReBuildIndexByFieldsAsync(locations, fields?.ToArray());
        }
    }
}
